/****************************************************
 * 创建人：@author fengxin    
 * 创建时间: 2019/12/18/16:44
 * 项目名称: risk
 * 文件名称: CoPair.java
 * 文件描述: 
 *
 * All rights Reserved, Designed By 投资交易团队
 * @Copyright:2016-2019
 *
 ********************************************************/
package com.learn.zmq.pair;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

/**
 * 包名称：com.ysstech.zmq.pair
 * 类名称：CoPair
 * 类描述：SocketType.PAIR 用例
 * PAIR模式可使用tcp，ipc，inproc三种不同协议
 * 创建人：@author fengxin
 * 创建时间：2019/12/18/16:44
 */
public class CoPair {


    private static class PairUp extends Thread
    {
        private ZContext context;

        private PairUp(ZContext context)
        {
            this.context = context;
        }

        @Override
        public void run()
        {
            Integer msgNo = 0;

            ZMQ.Socket xmitter = context.createSocket(SocketType.PAIR);
            xmitter.connect("inproc://down");

            ZMQ.Socket receiver = context.createSocket(SocketType.PAIR);
            receiver.bind("inproc://up");

            while (true) {

                // up 先发后收
                System.out.println("send "+msgNo + " down");
                xmitter.send(msgNo.toString().getBytes(ZMQ.CHARSET), 0);

                try {
                    Thread.sleep(1000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //  Wait for signal
                byte[] reply = receiver.recv(0);
                String msg = new String(reply,ZMQ.CHARSET);
                System.out.println("get "+msg+" from down");
                msgNo = Integer.parseInt(msg)+1;
            }
        }

    }


    private static class PairDown extends Thread
    {
        private ZContext context;

        private PairDown(ZContext context)
        {
            this.context = context;
        }

        @Override
        public void run()
        {

            //  Bind to inproc: endpoint, then start upstream thread
            ZMQ.Socket receiver = context.createSocket(SocketType.PAIR);
            receiver.bind("inproc://down");

            //  Signal upstream to up
            ZMQ.Socket xmitter = context.createSocket(SocketType.PAIR);
            xmitter.connect("inproc://up");

            while(true) {

                // down 先收后发
                byte[] reply = receiver.recv(0);
                String msg = new String(reply, ZMQ.CHARSET);
                System.out.println("get " + msg + " from up");
                Integer msgNo = Integer.parseInt(msg) + 1;

                System.out.println("reply " + msgNo + " up");
                xmitter.send(msgNo.toString().getBytes(ZMQ.CHARSET), 0);
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {

            Thread down = new CoPair.PairDown(context);
            Thread up = new CoPair.PairUp(context);

            down.start();
            up.start();

            down.join();
            up.join();

        }
    }


}
